Skip to content

AutoShardClient

class AutoShardedClient (Client)

A client to automatically shard the bot.

You can optionally specify the total number of shards to start with, or it will be determined automatically.

Source code in naff/client/auto_shard_client.py
class AutoShardedClient(Client):
    """
    A client to automatically shard the bot.

    You can optionally specify the total number of shards to start with, or it will be determined automatically.
    """

    def __init__(self, *args, **kwargs) -> None:
        if "total_shards" not in kwargs:
            self.auto_sharding = True
        else:
            self.auto_sharding = False

        super().__init__(*args, **kwargs)

        self._connection_state = None

        self._connection_states: list[ConnectionState] = []

        self.max_start_concurrency: int = 1

    @property
    def gateway_started(self) -> bool:
        """Returns if the gateway has been started in all shards."""
        return all(state.gateway_started.is_set() for state in self._connection_states)

    @property
    def shards(self) -> list[ConnectionState]:
        """Returns a list of all shards currently in use."""
        return self._connection_states

    @property
    def latency(self) -> float:
        """The average latency of all active gateways."""
        if len(self._connection_states):
            latencies = sum((g.latency for g in self._connection_states))
            return latencies / len(self._connection_states)
        else:
            return float("inf")

    @property
    def latencies(self) -> dict[int, float]:
        """
        Return a dictionary of latencies for all shards.

        Returns:
            {shard_id: latency}
        """
        return {state.shard_id: state.latency for state in self._connection_states}

    async def stop(self) -> None:
        """Shutdown the bot."""
        logger.debug("Stopping the bot.")
        self._ready.clear()
        await self.http.close()
        await asyncio.gather(*(state.stop() for state in self._connection_states))

    def get_guild_websocket(self, guild_id: "Snowflake_Type") -> GatewayClient:
        """
        Get the appropriate websocket for a given guild

        Args:
            guild_id: The ID of the guild

        Returns:
            A gateway client for the given ID
        """
        shard_id = (guild_id >> 22) % self.total_shards
        return next((state for state in self._connection_states if state.shard_id == shard_id), MISSING).gateway

    def get_shards_guild(self, shard_id: int) -> list[Guild]:
        """
        Returns the guilds that the specified shard can see

        Args:
            shard_id: The ID of the shard

        Returns:
            A list of guilds
        """
        return [guild for key, guild in self.cache.guild_cache.items() if ((key >> 22) % self.total_shards) == shard_id]

    @Listener.create()
    async def _on_websocket_ready(self, event: events.RawGatewayEvent) -> None:
        """
        Catches websocket ready and determines when to dispatch the client `READY` signal.

        Args:
            event: The websocket ready packet
        """
        connection_data = event.data
        expected_guilds = {to_snowflake(guild["id"]) for guild in connection_data["guilds"]}
        shard_id, total_shards = connection_data["shard"]
        connection_state = next((state for state in self._connection_states if state.shard_id == shard_id), None)

        if len(expected_guilds) != 0:
            while True:
                try:
                    await asyncio.wait_for(self._guild_event.wait(), self.guild_event_timeout)
                except asyncio.TimeoutError:
                    logger.warning("Timeout waiting for guilds cache: Not all guilds will be in cache")
                    break
                self._guild_event.clear()
                if all(self.cache.get_guild(g_id) is not None for g_id in expected_guilds):
                    # all guilds cached
                    break

            if self.fetch_members:
                logger.info(f"Shard {shard_id} is waiting for members to be chunked")
                await asyncio.gather(*(guild.chunked.wait() for guild in self.guilds if guild.id in expected_guilds))
        else:
            logger.warning(
                f"Shard {shard_id} reports it has 0 guilds, this is an indicator you may be using too many shards"
            )
        # noinspection PyProtectedMember
        connection_state._shard_ready.set()
        self.dispatch(ShardConnect(shard_id))
        logger.debug(f"Shard {shard_id} is now ready")

        # noinspection PyProtectedMember
        await asyncio.gather(*[shard._shard_ready.wait() for shard in self._connection_states])

        # run any pending startup tasks
        if self.async_startup_tasks:
            try:
                await asyncio.gather(*self.async_startup_tasks)
            except Exception as e:
                self.dispatch(events.Error("async-extension-loader", e))

        # cache slash commands
        if not self._startup:
            await self._init_interactions()

        if not self._ready.is_set():
            self._ready.set()
            if not self._startup:
                self._startup = True
                self.dispatch(events.Startup())
            self.dispatch(events.Ready())

    async def astart(self, token) -> None:
        """
        Asynchronous method to start the bot.

        Args:
            token: Your bot's token
        """
        logger.debug("Starting http client...")
        await self.login(token)

        tasks = []

        # Sort shards into their respective ratelimit buckets
        shard_buckets = defaultdict(list)
        for shard in self._connection_states:
            bucket = str(shard.shard_id % self.max_start_concurrency)
            shard_buckets[bucket].append(shard)

        for bucket in shard_buckets.values():
            for shard in bucket:
                logger.debug(f"Starting {shard.shard_id}")
                start = time.perf_counter()
                tasks.append(asyncio.create_task(shard.start()))

                if self.max_start_concurrency == 1:
                    # connection ratelimiting when discord has asked for one connection concurrently
                    # noinspection PyProtectedMember
                    await shard._shard_ready.wait()
                    await asyncio.sleep(5.1 - (time.perf_counter() - start))

            # wait for shards to finish starting
            # noinspection PyProtectedMember
            await asyncio.gather(*[shard._shard_ready.wait() for shard in self._connection_states])

        try:
            await asyncio.gather(*tasks)
        finally:
            await self.stop()

    async def login(self, token) -> None:
        """
        Login to discord via http.

        !!! note
            You will need to run Naff.start_gateway() before you start receiving gateway events.

        Args:
            token str: Your bot's token

        """
        await super().login(token)
        data = await self.http.get_gateway_bot()

        self.max_start_concurrency = data["session_start_limit"]["max_concurrency"]
        if self.auto_sharding:
            self.total_shards = data["shards"]
        elif data["shards"] != self.total_shards:
            recommended_shards = data["shards"]
            logger.info(
                f"Discord recommends you start with {recommended_shards} shard{'s' if recommended_shards != 1 else ''} instead of {self.total_shards}"
            )

        logger.debug(f"Starting bot with {self.total_shards} shard{'s' if self.total_shards != 1 else ''}")
        self._connection_states: list[ConnectionState] = [
            ConnectionState(self, self.intents, shard_id) for shard_id in range(self.total_shards)
        ]

property readonly gateway_started: bool

Returns if the gateway has been started in all shards.

property readonly shards: list

Returns a list of all shards currently in use.

property readonly latency: float

The average latency of all active gateways.

property readonly latencies: dict

Return a dictionary of latencies for all shards.

Returns:

Type Description
{shard_id

latency}

async method stop(self)

Shutdown the bot.

Source code in naff/client/auto_shard_client.py
async def stop(self) -> None:
    """Shutdown the bot."""
    logger.debug("Stopping the bot.")
    self._ready.clear()
    await self.http.close()
    await asyncio.gather(*(state.stop() for state in self._connection_states))

method get_guild_websocket(self, guild_id)

Get the appropriate websocket for a given guild

Parameters:

Name Type Description Default
guild_id Snowflake_Type

The ID of the guild

required

Returns:

Type Description
GatewayClient

A gateway client for the given ID

Source code in naff/client/auto_shard_client.py
def get_guild_websocket(self, guild_id: "Snowflake_Type") -> GatewayClient:
    """
    Get the appropriate websocket for a given guild

    Args:
        guild_id: The ID of the guild

    Returns:
        A gateway client for the given ID
    """
    shard_id = (guild_id >> 22) % self.total_shards
    return next((state for state in self._connection_states if state.shard_id == shard_id), MISSING).gateway

method get_shards_guild(self, shard_id)

Returns the guilds that the specified shard can see

Parameters:

Name Type Description Default
shard_id int

The ID of the shard

required

Returns:

Type Description
list

A list of guilds

Source code in naff/client/auto_shard_client.py
def get_shards_guild(self, shard_id: int) -> list[Guild]:
    """
    Returns the guilds that the specified shard can see

    Args:
        shard_id: The ID of the shard

    Returns:
        A list of guilds
    """
    return [guild for key, guild in self.cache.guild_cache.items() if ((key >> 22) % self.total_shards) == shard_id]

async method astart(self, token)

Asynchronous method to start the bot.

Parameters:

Name Type Description Default
token

Your bot's token

required
Source code in naff/client/auto_shard_client.py
async def astart(self, token) -> None:
    """
    Asynchronous method to start the bot.

    Args:
        token: Your bot's token
    """
    logger.debug("Starting http client...")
    await self.login(token)

    tasks = []

    # Sort shards into their respective ratelimit buckets
    shard_buckets = defaultdict(list)
    for shard in self._connection_states:
        bucket = str(shard.shard_id % self.max_start_concurrency)
        shard_buckets[bucket].append(shard)

    for bucket in shard_buckets.values():
        for shard in bucket:
            logger.debug(f"Starting {shard.shard_id}")
            start = time.perf_counter()
            tasks.append(asyncio.create_task(shard.start()))

            if self.max_start_concurrency == 1:
                # connection ratelimiting when discord has asked for one connection concurrently
                # noinspection PyProtectedMember
                await shard._shard_ready.wait()
                await asyncio.sleep(5.1 - (time.perf_counter() - start))

        # wait for shards to finish starting
        # noinspection PyProtectedMember
        await asyncio.gather(*[shard._shard_ready.wait() for shard in self._connection_states])

    try:
        await asyncio.gather(*tasks)
    finally:
        await self.stop()

async method login(self, token)

Login to discord via http.

Note

You will need to run Naff.start_gateway() before you start receiving gateway events.

Parameters:

Name Type Description Default
token str

Your bot's token

required
Source code in naff/client/auto_shard_client.py
async def login(self, token) -> None:
    """
    Login to discord via http.

    !!! note
        You will need to run Naff.start_gateway() before you start receiving gateway events.

    Args:
        token str: Your bot's token

    """
    await super().login(token)
    data = await self.http.get_gateway_bot()

    self.max_start_concurrency = data["session_start_limit"]["max_concurrency"]
    if self.auto_sharding:
        self.total_shards = data["shards"]
    elif data["shards"] != self.total_shards:
        recommended_shards = data["shards"]
        logger.info(
            f"Discord recommends you start with {recommended_shards} shard{'s' if recommended_shards != 1 else ''} instead of {self.total_shards}"
        )

    logger.debug(f"Starting bot with {self.total_shards} shard{'s' if self.total_shards != 1 else ''}")
    self._connection_states: list[ConnectionState] = [
        ConnectionState(self, self.intents, shard_id) for shard_id in range(self.total_shards)
    ]

inherited property readonly is_closed: bool

Returns True if the bot has closed.

inherited property readonly is_ready: bool

Returns True if the bot is ready.

inherited property readonly average_latency: float

Returns the average latency of the websocket connection.

inherited property readonly start_time: datetime

The start time of the bot.

inherited property readonly user: NaffUser

Returns the bot's user.

inherited property readonly app: Application

Returns the bots application.

inherited property readonly owner: Optional[User]

Returns the bot's owner'.

inherited property readonly owners: List[User]

Returns the bot's owners as declared via client.owner_ids.

inherited property readonly guilds: List[Guild]

Returns a list of all guilds the bot is in.

inherited property readonly status: Status

Get the status of the bot.

IE online, afk, dnd

inherited property readonly activity: Activity

Get the activity of the bot.

inherited property readonly application_commands: List[naff.models.naff.application_commands.InteractionCommand]

A list of all application commands registered within the bot.

inherited property readonly ws: GatewayClient

Returns the websocket client.

async inherited method generate_prefixes(self, bot, message)

A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.

Note

To easily override this method, simply use the generate_prefixes parameter when instantiating the client

Parameters:

Name Type Description Default
bot Client

A reference to the client

required
message Message

A message to determine the prefix from.

required

Returns:

Type Description
str | collections.abc.Iterable[str]

A string or an iterable of strings to use as a prefix. By default, this will return client.default_prefix

Source code in naff/client/auto_shard_client.py
async def generate_prefixes(self, bot: "Client", message: Message) -> str | Iterable[str]:
    """
    A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.

    !!! note
        To easily override this method, simply use the `generate_prefixes` parameter when instantiating the client

    Args:
        bot: A reference to the client
        message: A message to determine the prefix from.

    Returns:
        A string or an iterable of strings to use as a prefix. By default, this will return `client.default_prefix`

    """
    return self.default_prefix

inherited method default_error_handler(source, error)

The default error logging behaviour.

Parameters:

Name Type Description Default
source str

The source of this error

required
error BaseException

The exception itself

required
Source code in naff/client/auto_shard_client.py
@staticmethod
def default_error_handler(source: str, error: BaseException) -> None:
    """
    The default error logging behaviour.

    Args:
        source: The source of this error
        error: The exception itself

    """
    out = traceback.format_exception(error)

    if isinstance(error, HTTPException):
        # HTTPException's are of 3 known formats, we can parse them for human readable errors
        try:
            errors = error.search_for_message(error.errors)
            out = f"HTTPException: {error.status}|{error.response.reason}: " + "\n".join(errors)
        except Exception:  # noqa : S110
            pass

    logger.error(
        "Ignoring exception in {}:{}{}".format(source, "\n" if len(out) > 1 else " ", "".join(out)),
    )

async inherited method on_error(self, source, error, *args, **kwargs)

Catches all errors dispatched by the library.

By default it will format and print them to console

Override this to change error handling behaviour

Source code in naff/client/auto_shard_client.py
async def on_error(self, source: str, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by the library.

    By default it will format and print them to console

    Override this to change error handling behaviour

    """
    self.default_error_handler(source, error)

async inherited method on_command_error(self, ctx, error, *args, **kwargs)

Catches all errors dispatched by commands.

By default it will call Client.on_error

Override this to change error handling behavior

Source code in naff/client/auto_shard_client.py
async def on_command_error(self, ctx: Context, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by commands.

    By default it will call `Client.on_error`

    Override this to change error handling behavior

    """
    self.dispatch(events.Error(f"cmd /`{ctx.invoke_target}`", error, args, kwargs, ctx))
    try:
        if isinstance(error, errors.CommandOnCooldown):
            await ctx.send(
                embeds=Embed(
                    description=f"This command is on cooldown!\n"
                    f"Please try again in {int(error.cooldown.get_cooldown_time())} seconds",
                    color=BrandColors.FUCHSIA,
                )
            )
        elif isinstance(error, errors.MaxConcurrencyReached):
            await ctx.send(
                embeds=Embed(
                    description="This command has reached its maximum concurrent usage!\n"
                    "Please try again shortly.",
                    color=BrandColors.FUCHSIA,
                )
            )
        elif isinstance(error, errors.CommandCheckFailure):
            await ctx.send(
                embeds=Embed(
                    description="You do not have permission to run this command!",
                    color=BrandColors.YELLOW,
                )
            )
        elif self.send_command_tracebacks:
            out = "".join(traceback.format_exception(error))
            if self.http.token is not None:
                out = out.replace(self.http.token, "[REDACTED TOKEN]")
            await ctx.send(
                embeds=Embed(
                    title=f"Error: {type(error).__name__}",
                    color=BrandColors.RED,
                    description=f"```\n{out[:EMBED_MAX_DESC_LENGTH-8]}```",
                )
            )
    except errors.NaffException:
        pass

async inherited method on_command(self, ctx)

Called after any command is ran.

By default, it will simply log the command, override this to change that behaviour

Parameters:

Name Type Description Default
ctx Context

The context of the command that was called

required
Source code in naff/client/auto_shard_client.py
async def on_command(self, ctx: Context) -> None:
    """
    Called *after* any command is ran.

    By default, it will simply log the command, override this to change that behaviour

    Args:
        ctx: The context of the command that was called

    """
    if isinstance(ctx, PrefixedContext):
        symbol = "@"
    elif isinstance(ctx, InteractionContext):
        symbol = "/"
    else:
        symbol = "?"  # likely custom context
    logger.info(f"Command Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

async inherited method on_component_error(self, ctx, error, *args, **kwargs)

Catches all errors dispatched by components.

By default it will call Naff.on_error

Override this to change error handling behavior

Source code in naff/client/auto_shard_client.py
async def on_component_error(self, ctx: ComponentContext, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by components.

    By default it will call `Naff.on_error`

    Override this to change error handling behavior

    """
    return self.dispatch(events.Error(f"Component Callback for {ctx.custom_id}", error, args, kwargs, ctx))

async inherited method on_component(self, ctx)

Called after any component callback is ran.

By default, it will simply log the component use, override this to change that behaviour

Parameters:

Name Type Description Default
ctx ComponentContext

The context of the component that was called

required
Source code in naff/client/auto_shard_client.py
async def on_component(self, ctx: ComponentContext) -> None:
    """
    Called *after* any component callback is ran.

    By default, it will simply log the component use, override this to change that behaviour

    Args:
        ctx: The context of the component that was called

    """
    symbol = "¢"
    logger.info(f"Component Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

async inherited method on_autocomplete_error(self, ctx, error, *args, **kwargs)

Catches all errors dispatched by autocompletion options.

By default it will call Naff.on_error

Override this to change error handling behavior

Source code in naff/client/auto_shard_client.py
async def on_autocomplete_error(self, ctx: AutocompleteContext, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by autocompletion options.

    By default it will call `Naff.on_error`

    Override this to change error handling behavior

    """
    return self.dispatch(
        events.Error(
            f"Autocomplete Callback for /{ctx.invoke_target} - Option: {ctx.focussed_option}",
            error,
            args,
            kwargs,
            ctx,
        )
    )

async inherited method on_autocomplete(self, ctx)

Called after any autocomplete callback is ran.

By default, it will simply log the autocomplete callback, override this to change that behaviour

Parameters:

Name Type Description Default
ctx AutocompleteContext

The context of the command that was called

required
Source code in naff/client/auto_shard_client.py
async def on_autocomplete(self, ctx: AutocompleteContext) -> None:
    """
    Called *after* any autocomplete callback is ran.

    By default, it will simply log the autocomplete callback, override this to change that behaviour

    Args:
        ctx: The context of the command that was called

    """
    symbol = "$"
    logger.info(f"Autocomplete Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

inherited method start(self, token)

Start the bot.

Info

This is the recommended method to start the bot

Parameters:

Name Type Description Default
token

Your bot's token

required
Source code in naff/client/auto_shard_client.py
def start(self, token) -> None:
    """
    Start the bot.

    info:
        This is the recommended method to start the bot

    Args:
        token: Your bot's token

    """
    try:
        asyncio.run(self.astart(token))
    except KeyboardInterrupt:
        # ignore, cus this is useless and can be misleading to the
        # user
        pass

async inherited method start_gateway(self)

Starts the gateway connection.

Source code in naff/client/auto_shard_client.py
async def start_gateway(self) -> None:
    """Starts the gateway connection."""
    try:
        await self._connection_state.start()
    finally:
        await self.stop()

inherited method dispatch(self, event, *args, **kwargs)

Dispatch an event.

Parameters:

Name Type Description Default
event BaseEvent

The event to be dispatched.

required
Source code in naff/client/auto_shard_client.py
def dispatch(self, event: events.BaseEvent, *args, **kwargs) -> None:
    """
    Dispatch an event.

    Args:
        event: The event to be dispatched.

    """
    listeners = self.listeners.get(event.resolved_name, [])
    if listeners:
        logger.debug(f"Dispatching Event: {event.resolved_name}")
        event.bot = self
        for _listen in listeners:
            try:
                self._queue_task(_listen, event, *args, **kwargs)
            except Exception as e:
                raise BotException(
                    f"An error occurred attempting during {event.resolved_name} event processing"
                ) from e

    _waits = self.waits.get(event.resolved_name, [])
    if _waits:
        index_to_remove = []
        for i, _wait in enumerate(_waits):
            result = _wait(event)
            if result:
                index_to_remove.append(i)

        for idx in sorted(index_to_remove, reverse=True):
            _waits.pop(idx)

async inherited method wait_until_ready(self)

Waits for the client to become ready.

Source code in naff/client/auto_shard_client.py
async def wait_until_ready(self) -> None:
    """Waits for the client to become ready."""
    await self._ready.wait()

inherited method wait_for(self, event, checks, timeout)

Waits for a WebSocket event to be dispatched.

Parameters:

Name Type Description Default
event Union[str, BaseEvent]

The name of event to wait.

required
checks Union[Callable[..., bool], NoneType, naff.client.const.Missing]

A predicate to check what to wait for.

Missing
timeout Optional[float]

The number of seconds to wait before timing out.

None

Returns:

Type Description
Any

The event object.

Source code in naff/client/auto_shard_client.py
def wait_for(
    self,
    event: Union[str, "BaseEvent"],
    checks: Absent[Optional[Callable[..., bool]]] = MISSING,
    timeout: Optional[float] = None,
) -> Any:
    """
    Waits for a WebSocket event to be dispatched.

    Args:
        event: The name of event to wait.
        checks: A predicate to check what to wait for.
        timeout: The number of seconds to wait before timing out.

    Returns:
        The event object.

    """
    event = get_event_name(event)

    if event not in self.waits:
        self.waits[event] = []

    future = asyncio.Future()
    self.waits[event].append(Wait(event, checks, future))

    return asyncio.wait_for(future, timeout)

async inherited method wait_for_modal(self, modal, author, timeout)

Wait for a modal response.

Parameters:

Name Type Description Default
modal Modal

The modal we're waiting for.

required
author Optional[Snowflake_Type]

The user we're waiting for to reply

None
timeout Optional[float]

A timeout in seconds to stop waiting

None

Returns:

Type Description
ModalContext

The context of the modal response

Source code in naff/client/auto_shard_client.py
async def wait_for_modal(
    self,
    modal: "Modal",
    author: Optional["Snowflake_Type"] = None,
    timeout: Optional[float] = None,
) -> ModalContext:
    """
    Wait for a modal response.

    Args:
        modal: The modal we're waiting for.
        author: The user we're waiting for to reply
        timeout: A timeout in seconds to stop waiting

    Returns:
        The context of the modal response

    Raises:
        `asyncio.TimeoutError` if no response is received that satisfies the predicate before timeout seconds have passed

    """
    author = to_snowflake(author) if author else None

    def predicate(event) -> bool:
        if modal.custom_id != event.context.custom_id:
            return False
        if author and author != to_snowflake(event.context.author):
            return False
        return True

    resp = await self.wait_for("modal_response", predicate, timeout)
    return resp.context

async inherited method wait_for_component(self, messages, components, check, timeout)

Waits for a component to be sent to the bot.

Parameters:

Name Type Description Default
messages Union[naff.models.discord.message.Message, int, list]

The message object to check for.

None
components Union[List[List[Union[BaseComponent, dict]]], List[Union[BaseComponent, dict]], BaseComponent, dict]

The components to wait for.

None
check Optional[Callable]

A predicate to check what to wait for.

None
timeout Optional[float]

The number of seconds to wait before timing out.

None

Returns:

Type Description
Component

Component that was invoked. Use .context to get the ComponentContext.

Source code in naff/client/auto_shard_client.py
async def wait_for_component(
    self,
    messages: Union[Message, int, list] = None,
    components: Optional[
        Union[List[List[Union["BaseComponent", dict]]], List[Union["BaseComponent", dict]], "BaseComponent", dict]
    ] = None,
    check: Optional[Callable] = None,
    timeout: Optional[float] = None,
) -> "Component":
    """
    Waits for a component to be sent to the bot.

    Args:
        messages: The message object to check for.
        components: The components to wait for.
        check: A predicate to check what to wait for.
        timeout: The number of seconds to wait before timing out.

    Returns:
        `Component` that was invoked. Use `.context` to get the `ComponentContext`.

    Raises:
        `asyncio.TimeoutError` if timed out

    """
    if not (messages or components):
        raise ValueError("You must specify messages or components (or both)")

    message_ids = (
        to_snowflake_list(messages) if isinstance(messages, list) else to_snowflake(messages) if messages else None
    )
    custom_ids = list(get_components_ids(components)) if components else None

    # automatically convert improper custom_ids
    if custom_ids and not all(isinstance(x, str) for x in custom_ids):
        custom_ids = [str(i) for i in custom_ids]

    def _check(event: Component) -> bool:
        ctx: ComponentContext = event.context
        # if custom_ids is empty or there is a match
        wanted_message = not message_ids or ctx.message.id in (
            [message_ids] if isinstance(message_ids, int) else message_ids
        )
        wanted_component = not custom_ids or ctx.custom_id in custom_ids
        if wanted_message and wanted_component:
            if check is None or check(event):
                return True
            return False
        return False

    return await self.wait_for("component", checks=_check, timeout=timeout)

inherited method listen(self, event_name)

A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.

Parameters:

Name Type Description Default
event_name Union[str, naff.client.const.Missing]

The event name to use, if not the coroutine name

Missing

Returns:

Type Description
Listener

A listener that can be used to hook into the event.

Source code in naff/client/auto_shard_client.py
def listen(self, event_name: Absent[str] = MISSING) -> Listener:
    """
    A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.

    Args:
        event_name: The event name to use, if not the coroutine name

    Returns:
        A listener that can be used to hook into the event.

    """

    def wrapper(coro: Callable[..., Coroutine]) -> Listener:
        listener = Listener.create(event_name)(coro)
        self.add_listener(listener)
        return listener

    return wrapper

inherited method add_event_processor(self, event_name)

A decorator to be used to add event processors.

Parameters:

Name Type Description Default
event_name Union[str, naff.client.const.Missing]

The event name to use, if not the coroutine name

Missing

Returns:

Type Description
Callable[..., Coroutine]

A function that can be used to hook into the event.

Source code in naff/client/auto_shard_client.py
def add_event_processor(self, event_name: Absent[str] = MISSING) -> Callable[..., Coroutine]:
    """
    A decorator to be used to add event processors.

    Args:
        event_name: The event name to use, if not the coroutine name

    Returns:
        A function that can be used to hook into the event.

    """

    def wrapper(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
        name = event_name
        if name is MISSING:
            name = coro.__name__
        name = name.lstrip("_")
        name = name.removeprefix("on_")
        self.processors[name] = coro
        return coro

    return wrapper

inherited method add_listener(self, listener)

Add a listener for an event, if no event is passed, one is determined.

Parameters:

Name Type Description Default
listener Listener

The listener to add to the client

required
Source code in naff/client/auto_shard_client.py
def add_listener(self, listener: Listener) -> None:
    """
    Add a listener for an event, if no event is passed, one is determined.

    Args:
        listener Listener: The listener to add to the client

    """
    # check that the required intents are enabled
    event_class_name = "".join([name.capitalize() for name in listener.event.split("_")])
    if event_class := globals().get(event_class_name):
        if required_intents := _INTENT_EVENTS.get(event_class):  # noqa
            if not any(required_intent in self.intents for required_intent in required_intents):
                self.logger.warning(
                    f"Event `{listener.event}` will not work since the required intent is not set -> Requires any of: `{required_intents}`"
                )

    if listener.event not in self.listeners:
        self.listeners[listener.event] = []
    self.listeners[listener.event].append(listener)

inherited method add_interaction(self, command)

Add a slash command to the client.

Parameters:

Name Type Description Default
command InteractionCommand

The command to add

required
Source code in naff/client/auto_shard_client.py
def add_interaction(self, command: InteractionCommand) -> bool:
    """
    Add a slash command to the client.

    Args:
        command InteractionCommand: The command to add

    """
    if self.debug_scope:
        command.scopes = [self.debug_scope]

    # for SlashCommand objs without callback (like objects made to hold group info etc)
    if command.callback is None:
        return False

    for scope in command.scopes:
        if scope not in self.interactions:
            self.interactions[scope] = {}
        elif command.resolved_name in self.interactions[scope]:
            old_cmd = self.interactions[scope][command.resolved_name]
            raise ValueError(f"Duplicate Command! {scope}::{old_cmd.resolved_name}")

        if self.enforce_interaction_perms:
            command.checks.append(command._permission_enforcer)  # noqa : w0212

        self.interactions[scope][command.resolved_name] = command

    return True

inherited method add_prefixed_command(self, command)

Add a prefixed command to the client.

Parameters:

Name Type Description Default
command PrefixedCommand

The command to add

required
Source code in naff/client/auto_shard_client.py
def add_prefixed_command(self, command: PrefixedCommand) -> None:
    """
    Add a prefixed command to the client.

    Args:
        command PrefixedCommand: The command to add

    """
    # check that the required intent is enabled or the prefix is a mention
    prefixes = (
        self.default_prefix
        if not isinstance(self.default_prefix, str) and not self.default_prefix == MENTION_PREFIX
        else (self.default_prefix,)
    )
    if (MENTION_PREFIX not in prefixes) and (Intents.GUILD_MESSAGE_CONTENT not in self.intents):
        self.logger.warning(
            f"Prefixed commands will not work since the required intent is not set -> Requires: `{Intents.GUILD_MESSAGE_CONTENT.__repr__()}` or usage of the default `MENTION_PREFIX` as the prefix"
        )

    command._parse_parameters()

    if self.prefixed_commands.get(command.name):
        raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {command.name}.")
    self.prefixed_commands[command.name] = command

    for alias in command.aliases:
        if self.prefixed_commands.get(alias):
            raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {alias}.")
        self.prefixed_commands[alias] = command

inherited method add_component_callback(self, command)

Add a component callback to the client.

Parameters:

Name Type Description Default
command ComponentCommand

The command to add

required
Source code in naff/client/auto_shard_client.py
def add_component_callback(self, command: ComponentCommand) -> None:
    """
    Add a component callback to the client.

    Args:
        command: The command to add

    """
    for listener in command.listeners:
        # I know this isn't an ideal solution, but it means we can lookup callbacks with O(1)
        if listener not in self._component_callbacks.keys():
            self._component_callbacks[listener] = command
            continue
        else:
            raise ValueError(f"Duplicate Component! Multiple component callbacks for `{listener}`")

inherited method add_modal_callback(self, command)

Add a modal callback to the client.

Parameters:

Name Type Description Default
command ModalCommand

The command to add

required
Source code in naff/client/auto_shard_client.py
def add_modal_callback(self, command: ModalCommand) -> None:
    """
    Add a modal callback to the client.

    Args:
        command: The command to add
    """
    for listener in command.listeners:
        if listener not in self._modal_callbacks.keys():
            self._modal_callbacks[listener] = command
            continue
        else:
            raise ValueError(f"Duplicate Component! Multiple modal callbacks for `{listener}`")

async inherited method synchronise_interactions(self, *, scopes, delete_commands)

Synchronise registered interactions with discord.

Parameters:

Name Type Description Default
scopes list

Optionally specify which scopes are to be synced

Missing
delete_commands Union[bool, naff.client.const.Missing]

Override the client setting and delete commands

Missing
Source code in naff/client/auto_shard_client.py
async def synchronise_interactions(
    self, *, scopes: list["Snowflake_Type"] = MISSING, delete_commands: Absent[bool] = MISSING
) -> None:
    """
    Synchronise registered interactions with discord.

    Args:
        scopes: Optionally specify which scopes are to be synced
        delete_commands: Override the client setting and delete commands
    """
    s = time.perf_counter()
    _delete_cmds = self.del_unused_app_cmd if delete_commands is MISSING else delete_commands
    await self._cache_interactions()

    if scopes is not MISSING:
        cmd_scopes = scopes
    elif self.del_unused_app_cmd:
        # if we're deleting unused commands, we check all scopes
        cmd_scopes = [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
    else:
        # if we're not deleting, just check the scopes we have cmds registered in
        cmd_scopes = list(set(self.interactions) | {GLOBAL_SCOPE})

    local_cmds_json = application_commands_to_dict(self.interactions)

    async def sync_scope(cmd_scope) -> None:

        sync_needed_flag = False  # a flag to force this scope to synchronise
        sync_payload = []  # the payload to be pushed to discord

        try:
            try:
                remote_commands = await self.http.get_application_commands(self.app.id, cmd_scope)
            except Forbidden:
                logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
                return

            for local_cmd in self.interactions.get(cmd_scope, {}).values():
                # get remote equivalent of this command
                remote_cmd_json = next(
                    (v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)), None
                )
                # get json representation of this command
                local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))

                # this works by adding any command we *want* on Discord, to a payload, and synchronising that
                # this allows us to delete unused commands, add new commands, or do nothing in 1 or less API calls

                if sync_needed(local_cmd_json, remote_cmd_json):
                    # determine if the local and remote commands are out-of-sync
                    sync_needed_flag = True
                    sync_payload.append(local_cmd_json)
                elif not _delete_cmds and remote_cmd_json:
                    _remote_payload = {
                        k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
                    }
                    sync_payload.append(_remote_payload)
                elif _delete_cmds:
                    sync_payload.append(local_cmd_json)

            sync_payload = [json.loads(_dump) for _dump in {json.dumps(_cmd) for _cmd in sync_payload}]

            if sync_needed_flag or (_delete_cmds and len(sync_payload) < len(remote_commands)):
                # synchronise commands if flag is set, or commands are to be deleted
                logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
                sync_response: list[dict] = await self.http.overwrite_application_commands(
                    self.app.id, sync_payload, cmd_scope
                )
                self._cache_sync_response(sync_response, cmd_scope)
            else:
                logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")

        except Forbidden as e:
            raise InteractionMissingAccess(cmd_scope) from e
        except HTTPException as e:
            self._raise_sync_exception(e, local_cmds_json, cmd_scope)

    await asyncio.gather(*[sync_scope(scope) for scope in cmd_scopes])

    t = time.perf_counter() - s
    logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")

inherited method get_application_cmd_by_id(self, cmd_id)

Get a application command from the internal cache by its ID.

Parameters:

Name Type Description Default
cmd_id Snowflake_Type

The ID of the command

required

Returns:

Type Description
Optional[naff.models.naff.application_commands.InteractionCommand]

The command, if one with the given ID exists internally, otherwise None

Source code in naff/client/auto_shard_client.py
def get_application_cmd_by_id(self, cmd_id: "Snowflake_Type") -> Optional[InteractionCommand]:
    """
    Get a application command from the internal cache by its ID.

    Args:
        cmd_id: The ID of the command

    Returns:
        The command, if one with the given ID exists internally, otherwise None

    """
    scope = self._interaction_scopes.get(str(cmd_id), MISSING)
    cmd_id = int(cmd_id)  # ensure int ID
    if scope != MISSING:
        for cmd in self.interactions[scope].values():
            if int(cmd.cmd_id.get(scope)) == cmd_id:
                return cmd
    return None

async inherited method get_context(self, data, interaction)

Return a context object based on data passed.

Note

If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.

Parameters:

Name Type Description Default
data Union[discord_typings.interactions.receiving.ApplicationCommandGuildInteractionData, discord_typings.interactions.receiving.ApplicationCommandChannelInteractionData, discord_typings.interactions.receiving.GuildUserCommandInteractionData, discord_typings.interactions.receiving.ChannelUserCommandInteractionData, discord_typings.interactions.receiving.ComponentGuildInteractionData, discord_typings.interactions.receiving.ComponentChannelInteractionData, discord_typings.interactions.receiving.AutocompleteGuildInteractionData, discord_typings.interactions.receiving.AutocompleteChannelInteractionData, discord_typings.interactions.receiving.ModalGuildInteractionData, discord_typings.interactions.receiving.ModalChannelInteractionData, dict, naff.models.discord.message.Message]

The data of the event

required
interaction bool

Is this an interaction or not?

False

Returns:

Type Description
naff.models.naff.context.ComponentContext | naff.models.naff.context.AutocompleteContext | naff.models.naff.context.ModalContext | naff.models.naff.context.InteractionContext | naff.models.naff.context.PrefixedContext

Context object

Source code in naff/client/auto_shard_client.py
async def get_context(
    self, data: InteractionData | dict | Message, interaction: bool = False
) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext:
    """
    Return a context object based on data passed.

    note:
        If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.

    Args:
        data: The data of the event
        interaction: Is this an interaction or not?

    Returns:
        Context object

    """
    # this line shuts up IDE warnings
    cls: ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext

    if interaction:
        match data["type"]:
            case InteractionTypes.MESSAGE_COMPONENT:
                cls = self.component_context.from_dict(data, self)

            case InteractionTypes.AUTOCOMPLETE:
                cls = self.autocomplete_context.from_dict(data, self)

            case InteractionTypes.MODAL_RESPONSE:
                cls = self.modal_context.from_dict(data, self)

            case _:
                cls = self.interaction_context.from_dict(data, self)

        if not cls.channel:
            try:
                cls.channel = await self.cache.fetch_channel(data["channel_id"])
            except Forbidden:
                cls.channel = BaseChannel.from_dict_factory(
                    {"id": data["channel_id"], "type": ChannelTypes.GUILD_TEXT}, self
                )

    else:
        cls = self.prefixed_context.from_message(self, data)
        if not cls.channel:
            cls.channel = await self.cache.fetch_channel(data._channel_id)

    return cls

inherited method get_extensions(self, name)

Get all ext with a name or extension name.

Parameters:

Name Type Description Default
name str

The name of the extension, or the name of it's extension

required

Returns:

Type Description
list

List of Extensions

Source code in naff/client/auto_shard_client.py
def get_extensions(self, name: str) -> list[Extension]:
    """
    Get all ext with a name or extension name.

    Args:
        name: The name of the extension, or the name of it's extension

    Returns:
        List of Extensions
    """
    if name not in self.ext.keys():
        return [ext for ext in self.ext.values() if ext.extension_name == name]

    return [self.ext.get(name, None)]

inherited method get_ext(self, name)

Get a extension with a name or extension name.

Parameters:

Name Type Description Default
name str

The name of the extension, or the name of it's extension

required

Returns:

Type Description
naff.models.naff.extension.Extension | None

A extension, if found

Source code in naff/client/auto_shard_client.py
def get_ext(self, name: str) -> Extension | None:
    """
    Get a extension with a name or extension name.

    Args:
        name: The name of the extension, or the name of it's extension

    Returns:
        A extension, if found
    """
    if ext := self.get_extensions(name):
        return ext[0]
    return None

inherited method load_extension(self, name, package, **load_kwargs)

Load an extension with given arguments.

Parameters:

Name Type Description Default
name str

The name of the extension.

required
package str

The package the extension is in

None
load_kwargs

The auto-filled mapping of the load keyword arguments

{}
Source code in naff/client/auto_shard_client.py
def load_extension(self, name: str, package: str = None, **load_kwargs) -> None:
    """
    Load an extension with given arguments.

    Args:
        name: The name of the extension.
        package: The package the extension is in
        load_kwargs: The auto-filled mapping of the load keyword arguments

    """
    name = importlib.util.resolve_name(name, package)
    if name in self.__modules:
        raise Exception(f"{name} already loaded")

    module = importlib.import_module(name, package)
    try:
        setup = getattr(module, "setup", None)
        if not setup:
            raise ExtensionLoadException(
                f"{name} lacks an entry point. Ensure you have a function called `setup` defined in that file"
            ) from None
        setup(self, **load_kwargs)
    except ExtensionLoadException:
        raise
    except Exception as e:
        del sys.modules[name]
        raise ExtensionLoadException(f"Unexpected Error loading {name}") from e

    else:
        logger.debug(f"Loaded Extension: {name}")
        self.__modules[name] = module

        if self.sync_ext and self._ready.is_set():
            try:
                asyncio.get_running_loop()
            except RuntimeError:
                return
            asyncio.create_task(self.synchronise_interactions())

inherited method unload_extension(self, name, package, **unload_kwargs)

Unload an extension with given arguments.

Parameters:

Name Type Description Default
name

The name of the extension.

required
package

The package the extension is in

None
unload_kwargs

The auto-filled mapping of the unload keyword arguments

{}
Source code in naff/client/auto_shard_client.py
def unload_extension(self, name, package=None, **unload_kwargs) -> None:
    """
    Unload an extension with given arguments.

    Args:
        name: The name of the extension.
        package: The package the extension is in
        unload_kwargs: The auto-filled mapping of the unload keyword arguments

    """
    name = importlib.util.resolve_name(name, package)
    module = self.__modules.get(name)

    if module is None:
        raise ExtensionNotFound(f"No extension called {name} is loaded")

    try:
        teardown = getattr(module, "teardown")
        teardown(**unload_kwargs)
    except AttributeError:
        pass

    for ext in self.get_extensions(name):
        ext.drop(**unload_kwargs)

    del sys.modules[name]
    del self.__modules[name]

    if self.sync_ext and self._ready.is_set():
        if self.sync_ext and self._ready.is_set():
            try:
                asyncio.get_running_loop()
            except RuntimeError:
                return
            asyncio.create_task(self.synchronise_interactions())

inherited method reload_extension(self, name, package, *, load_kwargs, unload_kwargs)

Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.

Parameters:

Name Type Description Default
name

The name of the extension.

required
package

The package the extension is in

None
load_kwargs Mapping[str, Any]

The manually-filled mapping of the load keyword arguments

None
unload_kwargs Mapping[str, Any]

The manually-filled mapping of the unload keyword arguments

None
Source code in naff/client/auto_shard_client.py
def reload_extension(
    self, name, package=None, *, load_kwargs: Mapping[str, Any] = None, unload_kwargs: Mapping[str, Any] = None
) -> None:
    """
    Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.

    Args:
        name: The name of the extension.
        package: The package the extension is in
        load_kwargs: The manually-filled mapping of the load keyword arguments
        unload_kwargs: The manually-filled mapping of the unload keyword arguments

    """
    name = importlib.util.resolve_name(name, package)
    module = self.__modules.get(name)

    if module is None:
        logger.warning("Attempted to reload extension thats not loaded. Loading extension instead")
        return self.load_extension(name, package)

    if not load_kwargs:
        load_kwargs = {}
    if not unload_kwargs:
        unload_kwargs = {}

    self.unload_extension(name, package, **unload_kwargs)
    self.load_extension(name, package, **load_kwargs)

    # todo: maybe add an ability to revert to the previous version if unable to load the new one

async inherited method fetch_guild(self, guild_id)

Fetch a guild.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

The ID of the guild to get

required

Returns:

Type Description
Optional[naff.models.discord.guild.Guild]

Guild Object if found, otherwise None

Source code in naff/client/auto_shard_client.py
async def fetch_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
    """
    Fetch a guild.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        guild_id: The ID of the guild to get

    Returns:
        Guild Object if found, otherwise None

    """
    try:
        return await self.cache.fetch_guild(guild_id)
    except NotFound:
        return None

inherited method get_guild(self, guild_id)

Get a guild.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

The ID of the guild to get

required

Returns:

Type Description
Optional[naff.models.discord.guild.Guild]

Guild Object if found, otherwise None

Source code in naff/client/auto_shard_client.py
def get_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
    """
    Get a guild.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        guild_id: The ID of the guild to get

    Returns:
        Guild Object if found, otherwise None

    """
    return self.cache.get_guild(guild_id)

async inherited method create_guild_from_template(self, template_code, name, icon)

Creates a new guild based on a template.

Note

This endpoint can only be used by bots in less than 10 guilds.

Parameters:

Name Type Description Default
template_code Union[GuildTemplate, str]

The code of the template to use.

required
name str

The name of the guild (2-100 characters)

required
icon Union[naff.models.discord.file.File, io.IOBase, BinaryIO, pathlib.Path, str, naff.client.const.Missing]

Location or File of icon to set

Missing

Returns:

Type Description
Optional[naff.models.discord.guild.Guild]

The newly created guild object

Source code in naff/client/auto_shard_client.py
async def create_guild_from_template(
    self,
    template_code: Union["GuildTemplate", str],
    name: str,
    icon: Absent[UPLOADABLE_TYPE] = MISSING,
) -> Optional[Guild]:
    """
    Creates a new guild based on a template.

    note:
        This endpoint can only be used by bots in less than 10 guilds.

    Args:
        template_code: The code of the template to use.
        name: The name of the guild (2-100 characters)
        icon: Location or File of icon to set

    Returns:
        The newly created guild object

    """
    if isinstance(template_code, GuildTemplate):
        template_code = template_code.code

    if icon:
        icon = to_image_data(icon)
    guild_data = await self.http.create_guild_from_guild_template(template_code, name, icon)
    return Guild.from_dict(guild_data, self)

async inherited method fetch_channel(self, channel_id)

Fetch a channel.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
channel_id Snowflake_Type

The ID of the channel to get

required

Returns:

Type Description
Optional[TYPE_ALL_CHANNEL]

Channel Object if found, otherwise None

Source code in naff/client/auto_shard_client.py
async def fetch_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
    """
    Fetch a channel.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        channel_id: The ID of the channel to get

    Returns:
        Channel Object if found, otherwise None

    """
    try:
        return await self.cache.fetch_channel(channel_id)
    except NotFound:
        return None

inherited method get_channel(self, channel_id)

Get a channel.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
channel_id Snowflake_Type

The ID of the channel to get

required

Returns:

Type Description
Optional[TYPE_ALL_CHANNEL]

Channel Object if found, otherwise None

Source code in naff/client/auto_shard_client.py
def get_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
    """
    Get a channel.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        channel_id: The ID of the channel to get

    Returns:
        Channel Object if found, otherwise None

    """
    return self.cache.get_channel(channel_id)

async inherited method fetch_user(self, user_id)

Fetch a user.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the user to get

required

Returns:

Type Description
Optional[naff.models.discord.user.User]

User Object if found, otherwise None

Source code in naff/client/auto_shard_client.py
async def fetch_user(self, user_id: "Snowflake_Type") -> Optional[User]:
    """
    Fetch a user.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        user_id: The ID of the user to get

    Returns:
        User Object if found, otherwise None

    """
    try:
        return await self.cache.fetch_user(user_id)
    except NotFound:
        return None

inherited method get_user(self, user_id)

Get a user.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the user to get

required

Returns:

Type Description
Optional[naff.models.discord.user.User]

User Object if found, otherwise None

Source code in naff/client/auto_shard_client.py
def get_user(self, user_id: "Snowflake_Type") -> Optional[User]:
    """
    Get a user.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        user_id: The ID of the user to get

    Returns:
        User Object if found, otherwise None

    """
    return self.cache.get_user(user_id)

async inherited method fetch_member(self, user_id, guild_id)

Fetch a member from a guild.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the member

required
guild_id Snowflake_Type

The ID of the guild to get the member from

required

Returns:

Type Description
Optional[naff.models.discord.user.Member]

Member object if found, otherwise None

Source code in naff/client/auto_shard_client.py
async def fetch_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
    """
    Fetch a member from a guild.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        user_id: The ID of the member
        guild_id: The ID of the guild to get the member from

    Returns:
        Member object if found, otherwise None

    """
    try:
        return await self.cache.fetch_member(guild_id, user_id)
    except NotFound:
        return None

inherited method get_member(self, user_id, guild_id)

Get a member from a guild.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the member

required
guild_id Snowflake_Type

The ID of the guild to get the member from

required

Returns:

Type Description
Optional[naff.models.discord.user.Member]

Member object if found, otherwise None

Source code in naff/client/auto_shard_client.py
def get_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
    """
    Get a member from a guild.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        user_id: The ID of the member
        guild_id: The ID of the guild to get the member from

    Returns:
        Member object if found, otherwise None

    """
    return self.cache.get_member(guild_id, user_id)

async inherited method fetch_scheduled_event(self, guild_id, scheduled_event_id, with_user_count)

Fetch a scheduled event by id.

Parameters:

Name Type Description Default
event_id

The id of the scheduled event.

required

Returns:

Type Description
Optional[ScheduledEvent]

The scheduled event if found, otherwise None

Source code in naff/client/auto_shard_client.py
async def fetch_scheduled_event(
    self, guild_id: "Snowflake_Type", scheduled_event_id: "Snowflake_Type", with_user_count: bool = False
) -> Optional["ScheduledEvent"]:
    """
    Fetch a scheduled event by id.

    Args:
        event_id: The id of the scheduled event.

    Returns:
        The scheduled event if found, otherwise None

    """
    try:
        scheduled_event_data = await self.http.get_scheduled_event(guild_id, scheduled_event_id, with_user_count)
        return ScheduledEvent.from_dict(scheduled_event_data, self)
    except NotFound:
        return None

async inherited method fetch_custom_emoji(self, emoji_id, guild_id)

Fetch a custom emoji by id.

Parameters:

Name Type Description Default
emoji_id Snowflake_Type

The id of the custom emoji.

required
guild_id Snowflake_Type

The id of the guild the emoji belongs to.

required

Returns:

Type Description
Optional[naff.models.discord.emoji.CustomEmoji]

The custom emoji if found, otherwise None.

Source code in naff/client/auto_shard_client.py
async def fetch_custom_emoji(self, emoji_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[CustomEmoji]:
    """
    Fetch a custom emoji by id.

    Args:
        emoji_id: The id of the custom emoji.
        guild_id: The id of the guild the emoji belongs to.

    Returns:
        The custom emoji if found, otherwise None.

    """
    try:
        return await self.cache.fetch_emoji(guild_id, emoji_id)
    except NotFound:
        return None

inherited method get_custom_emoji(self, emoji_id, guild_id)

Get a custom emoji by id.

Parameters:

Name Type Description Default
emoji_id Snowflake_Type

The id of the custom emoji.

required
guild_id Optional[Snowflake_Type]

The id of the guild the emoji belongs to.

None

Returns:

Type Description
Optional[naff.models.discord.emoji.CustomEmoji]

The custom emoji if found, otherwise None.

Source code in naff/client/auto_shard_client.py
def get_custom_emoji(
    self, emoji_id: "Snowflake_Type", guild_id: Optional["Snowflake_Type"] = None
) -> Optional[CustomEmoji]:
    """
    Get a custom emoji by id.

    Args:
        emoji_id: The id of the custom emoji.
        guild_id: The id of the guild the emoji belongs to.

    Returns:
        The custom emoji if found, otherwise None.

    """
    emoji = self.cache.get_emoji(emoji_id)
    if emoji and (not guild_id or emoji._guild_id == to_snowflake(guild_id)):
        return emoji
    return None

async inherited method fetch_sticker(self, sticker_id)

Fetch a sticker by ID.

Parameters:

Name Type Description Default
sticker_id Snowflake_Type

The ID of the sticker.

required

Returns:

Type Description
Optional[naff.models.discord.sticker.Sticker]

A sticker object if found, otherwise None

Source code in naff/client/auto_shard_client.py
async def fetch_sticker(self, sticker_id: "Snowflake_Type") -> Optional[Sticker]:
    """
    Fetch a sticker by ID.

    Args:
        sticker_id: The ID of the sticker.

    Returns:
        A sticker object if found, otherwise None

    """
    try:
        sticker_data = await self.http.get_sticker(sticker_id)
        return Sticker.from_dict(sticker_data, self)
    except NotFound:
        return None

async inherited method fetch_nitro_packs(self)

List the sticker packs available to Nitro subscribers.

Returns:

Type Description
Optional[List[StickerPack]]

A list of StickerPack objects if found, otherwise returns None

Source code in naff/client/auto_shard_client.py
async def fetch_nitro_packs(self) -> Optional[List["StickerPack"]]:
    """
    List the sticker packs available to Nitro subscribers.

    Returns:
        A list of StickerPack objects if found, otherwise returns None

    """
    try:
        packs_data = await self.http.list_nitro_sticker_packs()
        return [StickerPack.from_dict(data, self) for data in packs_data]

    except NotFound:
        return None

async inherited method fetch_voice_regions(self)

List the voice regions available on Discord.

Returns:

Type Description
List[VoiceRegion]

A list of voice regions.

Source code in naff/client/auto_shard_client.py
async def fetch_voice_regions(self) -> List["VoiceRegion"]:
    """
    List the voice regions available on Discord.

    Returns:
        A list of voice regions.

    """
    regions_data = await self.http.list_voice_regions()
    regions = VoiceRegion.from_list(regions_data)
    return regions

async inherited method connect_to_vc(self, guild_id, channel_id, muted, deafened)

Connect the bot to a voice channel.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

id of the guild the voice channel is in.

required
channel_id Snowflake_Type

id of the voice channel client wants to join.

required
muted bool

Whether the bot should be muted when connected.

False
deafened bool

Whether the bot should be deafened when connected.

False

Returns:

Type Description
ActiveVoiceState

The new active voice state on successfully connection.

Source code in naff/client/auto_shard_client.py
async def connect_to_vc(
    self, guild_id: "Snowflake_Type", channel_id: "Snowflake_Type", muted: bool = False, deafened: bool = False
) -> ActiveVoiceState:
    """
    Connect the bot to a voice channel.

    Args:
        guild_id: id of the guild the voice channel is in.
        channel_id: id of the voice channel client wants to join.
        muted: Whether the bot should be muted when connected.
        deafened: Whether the bot should be deafened when connected.

    Returns:
        The new active voice state on successfully connection.

    """
    return await self._connection_state.voice_connect(guild_id, channel_id, muted, deafened)

inherited method get_bot_voice_state(self, guild_id)

Get the bot's voice state for a guild.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

The target guild's id.

required

Returns:

Type Description
Optional[naff.models.naff.active_voice_state.ActiveVoiceState]

The bot's voice state for the guild if connected, otherwise None.

Source code in naff/client/auto_shard_client.py
def get_bot_voice_state(self, guild_id: "Snowflake_Type") -> Optional[ActiveVoiceState]:
    """
    Get the bot's voice state for a guild.

    Args:
        guild_id: The target guild's id.

    Returns:
        The bot's voice state for the guild if connected, otherwise None.

    """
    return self._connection_state.get_voice_state(guild_id)

async inherited method change_presence(self, status, activity)

Change the bots presence.

Parameters:

Name Type Description Default
status Union[str, naff.models.discord.enums.Status]

The status for the bot to be. i.e. online, afk, etc.

<Status.ONLINE: 'online'>
activity Union[naff.models.discord.activity.Activity, str]

The activity for the bot to be displayed as doing.

None

Note:: Bots may only be playing streaming listening watching or competing, other activity types are likely to fail.

Source code in naff/client/auto_shard_client.py
async def change_presence(
    self, status: Optional[Union[str, Status]] = Status.ONLINE, activity: Optional[Union[Activity, str]] = None
) -> None:
    """
    Change the bots presence.

    Args:
        status: The status for the bot to be. i.e. online, afk, etc.
        activity: The activity for the bot to be displayed as doing.

    Note::
        Bots may only be `playing` `streaming` `listening` `watching` or `competing`, other activity types are likely to fail.

    """
    await self._connection_state.change_presence(status, activity)